package com.lecoboy.highconcurrency.connectionDemo.thread_pool_demo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 简单的线程池
 * <p>
 * 成千上万的任务递交给服务器，频繁的创建销毁线程，CPU频繁进行线程上下文切换，无故增加系统的负载，浪费系统资源
 * <p>
 * 线程池很好的解决了这个问题，它预先创建若干数量的线程，并且不能由用户直接对线程的创建进行控制，
 * 在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是，一方面，消除了频繁创建和消亡线程的系统资源开销，
 * 另一方面，面对过量任务的提交能够平缓的劣化。
 *
 * <p>
 * 客户端可以通过execute(Job)方法将Job提交入线程池执行，而客户端自身不用等待Job的执行完成。
 * <p>
 * 除了execute(Job)方法以外，线程池接口提供了增大/减少工作者线程以及关闭线程池的方法。
 * <p>
 * AbstractThreadPoolTest 是为了不破坏原始结构作者做的测试抽象类
 *
 * @param <Job>
 */
public class DefaultThreadPool<Job extends Runnable> extends AbstractThreadPoolTest implements ThreadPool<Job> {
    private AtomicInteger successCount=new AtomicInteger();
    private AtomicInteger errorCount=new AtomicInteger();



    //线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    //线程池默认的数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;

    //线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    //这是一个工作列表，将会向里面插入工作
    private final LinkedList<Job> jobList = new LinkedList<Job>();
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());

    //工作者线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initalizeWokders(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int num) {
        //确保创建线程数在限制以内
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initalizeWokders(workerNum);
    }

    /**
     * 执行任务
     *
     * @param job
     */
    @Override
    public void execute(Job job) {
        if (job != null) {
            synchronized (jobList) {
                jobList.addLast(job);

                //这里是notify()，而不是notifyAll()，因为能够确定有公祖者线程唤醒，
                // 这时使用notify()方法将会比notifyAll()方法获得更小的开销
                // （避免将等待队列中的线程全部移动到阻塞对列中）
                jobList.notify();

                //test
                icrWorker();
            }
        }
    }

    /**
     * 取消任务
     */
    @Override
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }

    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobList) {
            //限制新增的Worker数量不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;

            }
            initalizeWokders(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobList) {
            if (num >= this.workerNum) {
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照给定的数量停止Worker
            int count = 0;
            while (count < num) {
                Worker worker = workers.get(count);
                if (workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobList.size();
    }

    //********* test 可有可无*********

    @Override
    public void getExecteResult() {
        super.getExecteResult();
        System.out.println(successCount.getAndIncrement());
    }
    //********** end ************

    /**
     * 初始化工作者
     *
     * @param num
     */
    private void initalizeWokders(int num) {
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker(successCount,errorCount);
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    /**
     * 工作者线程 负责消费任务
     */
    class Worker implements Runnable {
        //是否工作
        private volatile boolean running = true;

        AtomicInteger successCount;
        AtomicInteger errorCount;

        Worker(){

        }
        Worker(AtomicInteger successCount,AtomicInteger errorCount){
            this.successCount = successCount;
            this.errorCount = errorCount;
        }

        @Override
        public void run() {
            while (running) {

                Job job;
                //通过jobList的notify()来通知这里，因为jobList解锁后这里才能进行获取锁
                synchronized (jobList) {
                    //如果工作者列表是空的，那么久wait
                    while (jobList.isEmpty()) {
                        try {
                            jobList.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对WorkerThread的中断操作，返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }

                    //取出一个Job
                    job = jobList.removeFirst();


                }
                //日志打点
                icrRun();

                if (job != null) {
                    try {
                        //测试  稍微睡10Ms 能测出来不同数量任务执行成功的个数
                        //TimeUnit.MICROSECONDS.sleep(1);
                        job.run();
                        //测试
                        icrSuccessCount();
                        safeCount(successCount);

                    } catch (Exception ex) {
                        //忽略Job执行中的Exception

                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}
